package arp.message.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import arp.process.publish.Message;
import arp.process.publish.ProcessMessageSender;

public class KafkaMessageSender implements ProcessMessageSender {

	private KafkaProducer<String, Object> producer;

	private Properties props;

	private KafkaMessageSerializationStrategy<Object> serializationStrategy;

	public KafkaMessageSender(String servers) {
		this(servers, new FSTSerializationStrategy());
	}

	public KafkaMessageSender(String servers,
			KafkaMessageSerializationStrategy<?> serializationStrategy) {
		this.serializationStrategy = (KafkaMessageSerializationStrategy<Object>) serializationStrategy;

		props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
				StringSerializer.class);
		serializationStrategy.configValueDeserializerClass(props);
		props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
		producer = new KafkaProducer<>(props);
	}

	@Override
	public void send(Message msg) throws Exception {
		ProducerRecord<String, Object> record = serializationStrategy
				.serialize("arp_process_message", msg);
		producer.send(record).get();
	}

}
